﻿using System;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Microestc.EventBus.RabbitMQ
{
    public class EventBusRabbitMQ : IDisposable, IEventBus
    {
        private const string EXCHANG_PREFIX_NAME = "EVENTBUSEXCHANGE.";
        private const string QUEUE_PREFIX_NAME = "EVENTBUSQUEUE.";
        private readonly IRabbitMQPersistentConnection _connection;
        private readonly ILogger<EventBusRabbitMQ> _logger;
        private IEventBusSubscriptionsManager SubscriptionsManager { get; }
        private readonly IServiceProvider _serviceProvider;
        private readonly int _retryCount;
        private readonly string _exhange;
        private readonly string _queue;
        private IModel _channel;
        private string Exchange => EXCHANG_PREFIX_NAME + _exhange;
        private string Queue => QUEUE_PREFIX_NAME + _queue;

        public EventBusRabbitMQ(IRabbitMQPersistentConnection connection, IEventBusSubscriptionsManager subscriptionsManager, IServiceProvider serviceProvider, IOptions<EventBusSettings> options, ILogger<EventBusRabbitMQ> logger)
        {
            _connection = connection ?? throw new ArgumentNullException(nameof(connection));
            SubscriptionsManager = subscriptionsManager ?? new EventBusSubscriptionsManager();
            _serviceProvider = serviceProvider;
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            var settings = options.Value;
            _retryCount = settings.RetryCount;
            _exhange = settings.Exchange;
            _queue = settings.Queue;
            CreateChannel();
            StartBasicConsume();
            SubscriptionsManager.OnEventRemoved += SubsManager_OnEventRemoved;
        }

        private void SubsManager_OnEventRemoved(object sender, string routingKey)
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            using (var channel = _connection.CreateModel())
            {
                channel.QueueUnbind(queue: Queue, exchange: Exchange, routingKey: routingKey);

                if (SubscriptionsManager.IsEmpty)
                {
                    _channel.Close();
                }
            }
        }

        public void Publish(IntegrationEvent @event)
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            var policy = Policy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning(ex.ToString());
                });

            using (var channel = _connection.CreateModel())
            {
                var routingKey = @event.GetType().FullName;
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);

                policy.Execute(() =>
                {
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; // persistent

                    channel.BasicPublish(exchange: Exchange, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body);
                });
            }
        }

        public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
        {
            DoInternalSubscription(eventName);
            SubscriptionsManager.AddDynamicSubscription<TH>(eventName);
        }

        public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
        {
            var routingKey = SubscriptionsManager.GetEventKey<T>();
            DoInternalSubscription(routingKey);
            SubscriptionsManager.AddSubscription<T, TH>();
        }

        public void Subscribe(Type eventType, Type handlerType)
        {
            var routingKey = eventType.FullName;
            DoInternalSubscription(routingKey);
            SubscriptionsManager.AddSubscription(eventType, handlerType);
        }

        private void DoInternalSubscription(string routingKey)
        {
            var containsKey = SubscriptionsManager.HasSubscriptionsForEvent(routingKey);
            if (!containsKey)
            {
                if (!_connection.IsConnected)
                {
                    _connection.TryConnect();
                }

                using (var channel = _connection.CreateModel())
                {
                    channel.QueueDeclare(queue: Queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: Queue, exchange: Exchange, routingKey: routingKey);
                }
            }
        }

        public void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent
        {
            SubscriptionsManager.RemoveSubscription<T, TH>();
        }

        public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
        {
            SubscriptionsManager.RemoveDynamicSubscription<TH>(eventName);
        }

        public void Dispose()
        {
            if (_channel != null)
            {
                _channel.Dispose();
            }

            SubscriptionsManager.Clear();
        }

        #region Helper

        private void CreateChannel()
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            _channel = _connection.CreateModel();

            _channel.ExchangeDeclare(exchange: Exchange, type: "direct");

            _channel.CallbackException += (sender, ea) =>
            {
                _channel.Dispose();
                CreateChannel();
            };
        }

        private void StartBasicConsume()
        {
            if (_channel != null)
            {
                var consumer = new AsyncEventingBasicConsumer(_channel);

                consumer.Received += Consumer_Received;
                //启动消费者
                _channel.BasicConsume(queue: Queue, autoAck: false, consumer: consumer);
            }
            else
            {
                _logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
            }
        }

        private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
        {
            var routingKey = eventArgs.RoutingKey;
            var message = Encoding.UTF8.GetString(eventArgs.Body);

            try
            {
                if (message.ToLowerInvariant().Contains("throw-fake-exception"))
                {
                    throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
                }

                await ProcessEvent(routingKey, message);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
            }

            // Even on exception we take the message off the queue.
            // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). 
            // For more information see: https://www.rabbitmq.com/dlx.html
            _channel.BasicAck(eventArgs.DeliveryTag, multiple: false);

        }

        private async Task ProcessEvent(string routingKey, string message)
        {
            if (SubscriptionsManager.HasSubscriptionsForEvent(routingKey))
            {
                using (var scope = _serviceProvider.CreateScope())
                {
                    var services = scope.ServiceProvider;
                    var subscriptions = SubscriptionsManager.GetHandlersForEvent(routingKey);
                    foreach (var subscription in subscriptions)
                    {
                        if (subscription.IsDynamic)
                        {
                            var handler = services.GetService(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            if (handler == null) continue;
                            dynamic eventData = JObject.Parse(message);
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            var handler = services.GetService(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = SubscriptionsManager.GetEventTypeByName(routingKey);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }

        #endregion
    }
}
